1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package rx.internal.operators;
17
18 import rx.Observable.Operator;
19 import rx.Subscriber;
20 import rx.functions.Func1;
21 import rx.functions.Func2;
22
23
24
25
26
27
28
29 public final class OperatorTakeWhile<T> implements Operator<T, T> {
30
31 private final Func2<? super T, ? super Integer, Boolean> predicate;
32
33 public OperatorTakeWhile(final Func1<? super T, Boolean> underlying) {
34 this(new Func2<T, Integer, Boolean>() {
35 @Override
36 public Boolean call(T input, Integer index) {
37 return underlying.call(input);
38 }
39 });
40 }
41
42 public OperatorTakeWhile(Func2<? super T, ? super Integer, Boolean> predicate) {
43 this.predicate = predicate;
44 }
45
46 @Override
47 public Subscriber<? super T> call(final Subscriber<? super T> subscriber) {
48 Subscriber<T> s = new Subscriber<T>(subscriber, false) {
49
50 private int counter = 0;
51
52 private boolean done = false;
53
54 @Override
55 public void onNext(T args) {
56 boolean isSelected;
57 try {
58 isSelected = predicate.call(args, counter++);
59 } catch (Throwable e) {
60 done = true;
61 subscriber.onError(e);
62 unsubscribe();
63 return;
64 }
65 if (isSelected) {
66 subscriber.onNext(args);
67 } else {
68 done = true;
69 subscriber.onCompleted();
70 unsubscribe();
71 }
72 }
73
74 @Override
75 public void onCompleted() {
76 if (!done) {
77 subscriber.onCompleted();
78 }
79 }
80
81 @Override
82 public void onError(Throwable e) {
83 if (!done) {
84 subscriber.onError(e);
85 }
86 }
87
88 };
89 subscriber.add(s);
90 return s;
91 }
92
93 }